Skip to content

This implements the flux abstraction layer as used in RP#448

Merged
andre-merzky merged 64 commits intodevelfrom
fix/flux
Oct 30, 2025
Merged

This implements the flux abstraction layer as used in RP#448
andre-merzky merged 64 commits intodevelfrom
fix/flux

Conversation

@andre-merzky
Copy link
Member

This now covers different flux versions and also allows for multiple flux instances to work concurrently.

This code should eventually move to Rhapsody.

@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @andre-merzky, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces a significant refactoring and enhancement of the Flux integration within the system. The primary goal is to provide a robust and flexible abstraction layer that can seamlessly handle various Flux versions and allow for the concurrent operation of multiple Flux instances. This change lays the groundwork for improved job management and resource utilization, particularly in environments like Rhapsody, where this code is eventually intended to be used.

Highlights

  • Flux Abstraction Layer: Introduced a comprehensive abstraction layer for Flux, supporting different Flux versions (v0 and v1) and enabling concurrent Flux instances. This includes new modules for Flux service management, version-specific helpers, and dynamic module loading.
  • Modular Flux Implementation: The previous monolithic flux.py has been refactored into a flux package, separating concerns into FluxService (for managing Flux instances), FluxHelperV0/V1 (for version-specific job interaction), and FluxModule (for dynamic Flux module loading and version detection).
  • Environment Handling Enhancements: The env_write function in radical.utils.env now includes an extend parameter, allowing more flexible handling of environment variable quoting, particularly for shell directives.
  • Profiling Tool Integration: Added a new Yappi class to radical.utils.profile for Python code profiling, including automatic generation of pstats and optional gprof2dot visualization.
  • Robustness Improvements: Enhanced the _read_file_counter function in radical.utils.ids for better resource management and made EnvProcess exit handling more robust. TypedDict initialization and setter logic were also refined.
  • Testing Updates: The integration tests for Flux have been completely rewritten to utilize the new FluxService and FluxHelper APIs, ensuring proper functionality of the new abstraction layer.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@codecov
Copy link

codecov bot commented Sep 24, 2025

Codecov Report

❌ Patch coverage is 30.71298% with 379 lines in your changes missing coverage. Please review.
✅ Project coverage is 61.23%. Comparing base (bd6d571) to head (19d86b3).
⚠️ Report is 65 commits behind head on devel.

Files with missing lines Patch % Lines
src/radical/utils/flux/flux_helper_v1.py 16.04% 136 Missing ⚠️
src/radical/utils/flux/flux_helper_v0.py 21.62% 87 Missing ⚠️
src/radical/utils/flux/flux_service.py 26.19% 62 Missing ⚠️
src/radical/utils/flux/flux_module.py 48.11% 55 Missing ⚠️
src/radical/utils/profile.py 25.92% 20 Missing ⚠️
src/radical/utils/env.py 33.33% 14 Missing ⚠️
src/radical/utils/typeddict.py 72.72% 3 Missing ⚠️
src/radical/utils/ids.py 87.50% 2 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##            devel     #448      +/-   ##
==========================================
- Coverage   61.79%   61.23%   -0.57%     
==========================================
  Files          62       66       +4     
  Lines        7133     7359     +226     
==========================================
+ Hits         4408     4506      +98     
- Misses       2725     2853     +128     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR implements a flux abstraction layer used in the Rhapsody project, introducing support for different flux versions and enabling multiple concurrent flux instances. The implementation includes a modular flux service architecture, version-specific helper classes, and various test and utility improvements.

Key Changes

  • Replaced the old flux module with a new modular flux abstraction layer supporting version detection and concurrent instances
  • Added comprehensive flux service and helper classes for both flux v0 and v1
  • Improved test infrastructure with better process handling and error reporting

Reviewed Changes

Copilot reviewed 15 out of 17 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
src/radical/utils/flux.py Removed old monolithic flux implementation (642 lines deleted)
src/radical/utils/flux/init.py New flux module entry point with version-aware helper selection
src/radical/utils/flux/flux_service.py New flux service for managing flux instances
src/radical/utils/flux/flux_module.py New flux module wrapper with version detection and spec utilities
src/radical/utils/flux/flux_helper_v0.py New v0-specific flux helper implementation
src/radical/utils/flux/flux_helper_v1.py New v1-specific flux helper implementation
tests/integration_tests/test_flux.py Simplified flux integration tests using new API
src/radical/utils/typeddict.py Performance improvements in TypedDict initialization
src/radical/utils/profile.py Added Yappi profiling context manager
src/radical/utils/ids.py Improved file counter handling with proper error recovery
tests/unittests/test_heartbeat.py Added helper function for better process cleanup
src/radical/utils/env.py Enhanced environment variable handling with extend support
tests/unittests/test_env.py Added conditional check for env_proc
tests/unittests/test_typeddict.py Added shebang and main execution block
src/radical/utils/init.py Updated imports for new flux classes

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

Comment on lines +270 to +274
finally:
try:
os.close(fd)
except:
pass
Copy link

Copilot AI Sep 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The file descriptor fd may not be defined if an exception occurs before line 257. The finally block should check if fd is defined before attempting to close it.

Copilot uses AI. Check for mistakes.
def _join(proc, timeout=0.1):
proc.join(timeout=timeout)
try:
os.waitpid(test_proc.pid, os.WNOHANG)
Copy link

Copilot AI Sep 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function uses test_proc.pid instead of proc.pid. This will cause a NameError if test_proc is not in scope, or use the wrong process ID.

Suggested change
os.waitpid(test_proc.pid, os.WNOHANG)
os.waitpid(proc.pid, os.WNOHANG)

Copilot uses AI. Check for mistakes.
self._uri = None
self._proc = None

self._log.info('%s: found flux uri: %s', self._uid, self.uri)
Copy link

Copilot AI Sep 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This log message says 'found flux uri' but is actually logging when the service is stopped and uri is None. The message should be 'stopped flux service' or similar.

Suggested change
self._log.info('%s: found flux uri: %s', self._uid, self.uri)
self._log.info('%s: stopped flux service (uri was: %s)', self._uid, self.uri)

Copilot uses AI. Check for mistakes.
Comment on lines 303 to 304
return self.__dict__['_data']

Copy link

Copilot AI Sep 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The condition checks for '_data' but no longer handles the case where '_data' is not in dict. This could cause a KeyError since the initialization logic was moved.

Suggested change
return self.__dict__['_data']
if '_data' in self.__dict__:
return self.__dict__['_data']
else:
raise AttributeError("'_data' attribute not initialized")

Copilot uses AI. Check for mistakes.
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a significant and well-structured refactoring of the Flux integration, creating an abstraction layer that supports multiple Flux versions and concurrent instances. The changes are extensive, replacing the old flux.py with a new flux package containing version-specific helpers and a service class. My review has identified a few critical and high-severity issues in the new helper classes, primarily related to thread management, potential deadlocks, and correctness in the shutdown logic, which should be addressed to ensure the stability of this new abstraction.

andre-merzky and others added 4 commits September 29, 2025 11:53
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
@andre-merzky
Copy link
Member Author

@mtitov : this should be ready now.

Copy link
Collaborator

@mtitov mtitov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@andre-merzky andre-merzky merged commit 8d49419 into devel Oct 30, 2025
9 checks passed
@andre-merzky andre-merzky deleted the fix/flux branch October 30, 2025 17:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants